/*
 * Copyright (c) 2009, Michael van der Westhuizen
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *    - Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *    - Redistributions in binary form must reproduce the above
 *      copyright notice, this list of conditions and the following
 *      disclaimer in the documentation and/or other materials provided
 *      with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */
#ifndef MQCONSUMER_MCVDW_20090517_C
#define MQCONSUMER_MCVDW_20090517_C

#include "imq.h"

#include "mqstatus.c"
#include "util.c"
#include "mqdestination.c"
#include "mqmessage.c"
#include "mqproducer.c"
#include "mqconsumer-callbacks.c"



static int mqcrt_MQConsumer_close_consumer(mqcrt_MQConsumer *self, int raise_errors)
{
    MQStatus status;
    MQConsumerHandle handle;

    if(self->closed) {
        return 1;
    }

    COPY_HANDLE(handle, self->consumer);
    INIT_HANDLE(self->consumer);
    self->closed = 1;

    if (HANDLE_IS_VALID(handle)) {
        status = MQCloseMessageConsumer(handle);

        if(raise_errors) {
            if (setExceptionFromStatus(status)) {
                self->closed = 0;
                COPY_HANDLE(self->consumer, handle);
                return 0;
            }
        }
    }

    return 1;
}


static int mqcrt_MQConsumer_traverse(mqcrt_MQConsumer *self, visitproc visit, void *arg)
{
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_traverse: entry"));
    Py_VISIT(self->session);
    /* what about data in the callback? surely that's got to use the GC allocator (or does it?) */
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_traverse: exit"));
    return 0;
}


static int mqcrt_MQConsumer_clear(mqcrt_MQConsumer *self)
{
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_clear: entry"));
    Py_CLEAR(self->session);
    /* what about data in the callback? surely that's got to use the GC allocator (or does it?) */
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_clear: exit"));
    return 0;
}


static void mqcrt_MQConsumer_dealloc(mqcrt_MQConsumer *self)
{
    PyObject * tmp;
    messageListenerData * callback_data;

    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_dealloc: entry"));

    mqcrt_MQConsumer_close_consumer(self, 0);

    tmp = self->session;
    self->session = NULL;

    callback_data = (messageListenerData *)self->callback_data;
    self->callback_data = NULL;

    Py_XDECREF(tmp);
    tmp = NULL;

    if (NULL != callback_data) {
        Py_CLEAR(callback_data->callable);
        Py_CLEAR(callback_data->args);
        PyMem_Free(callback_data);
        callback_data = NULL;
    }

    self->ob_type->tp_free((PyObject*)self);
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_dealloc: exit"));
}


static PyObject * mqcrt_MQConsumer_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
    mqcrt_MQConsumer * self;
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_new: entry"));

    if (NULL == (self = (mqcrt_MQConsumer *)type->tp_alloc(type, 0))) {
        return NULL;
    }

    INIT_HANDLE(self->consumer);
    self->closed = 0;
    self->session = NULL;
    self->callback_data = NULL;

    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_new: exit"));
    return (PyObject *)self;
}


static int mqcrt_MQConsumer_init(mqcrt_MQConsumer *self, PyObject *args, PyObject *kwds)
{
    messageListenerData * callback_data;
    PyObject * tmp;
    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_init: entry"));

    if (!mqcrt_MQConsumer_close_consumer(self, 1)) {
        return -1;
    }

    callback_data = (messageListenerData *)self->callback_data;
    self->callback_data = NULL;
    tmp = self->session;
    self->session = NULL;

    Py_CLEAR(tmp);

    if (NULL != callback_data) {
        Py_CLEAR(callback_data->callable);
        Py_CLEAR(callback_data->args);
        PyMem_Free(callback_data);
        callback_data = NULL;
    }

    DPRINTF(("%s\n", "INFO: mqcrt_MQConsumer_init: exit"));
    return 0;
}


static PyObject * mqcrt_MQConsumer_close(mqcrt_MQConsumer *self, PyObject *args, PyObject *kwds)
{
    if (!mqcrt_MQConsumer_close_consumer(self, 1)) {
        return NULL;
    }

    Py_RETURN_NONE;
}


static PyObject * mqcrt_MQConsumer_receive_message_wait(mqcrt_MQConsumer *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQMessage * msg;
    MQStatus status;

    if (NULL == (msg = (mqcrt_MQMessage *)PyObject_CallObject((PyObject *)&mqcrt_MQMessageType, NULL))) {
        return NULL;
    }

    msg->session = self->session;
    Py_XINCREF(msg->session);

    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);
    Py_BEGIN_ALLOW_THREADS
    status = MQReceiveMessageWait(self->consumer, &msg->message);
    Py_END_ALLOW_THREADS
    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);

    if (setExceptionFromStatus(status)) {
        Py_CLEAR(msg);
        return NULL;
    }

    return (PyObject *)msg;
}


static PyObject * mqcrt_MQConsumer_receive_message_with_timeout(mqcrt_MQConsumer *self, PyObject *args, PyObject *kwds)
{
    MQInt32 timeoutMilliSeconds;
    mqcrt_MQMessage * msg;
    MQStatus status;

    static char *kwlist[] = {"timeout", NULL};

    timeoutMilliSeconds = 1000;

    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|i", kwlist, &timeoutMilliSeconds)) {
        return NULL;
    }

    if (NULL == (msg = (mqcrt_MQMessage *)PyObject_CallObject((PyObject *)&mqcrt_MQMessageType, NULL))) {
        return NULL;
    }

    msg->session = self->session;
    Py_XINCREF(msg->session);

    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);
    Py_BEGIN_ALLOW_THREADS
    status = MQReceiveMessageWithTimeout(self->consumer, timeoutMilliSeconds, &msg->message);
    Py_END_ALLOW_THREADS
    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);

    if (MQ_TIMEOUT_EXPIRED == MQGetStatusCode(status)) {
        Py_CLEAR(msg);
        Py_RETURN_NONE;
    }

    if (setExceptionFromStatus(status)) {
        Py_CLEAR(msg);
        return NULL;
    }

    return (PyObject *)msg;
}


static PyObject * mqcrt_MQConsumer_receive_message_nowait(mqcrt_MQConsumer *self, PyObject *args, PyObject *kwds)
{
    mqcrt_MQMessage * msg;
    MQStatus status;

    if (NULL == (msg = (mqcrt_MQMessage *)PyObject_CallObject((PyObject *)&mqcrt_MQMessageType, NULL))) {
        return NULL;
    }

    msg->session = self->session;
    Py_XINCREF(msg->session);

    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);
    Py_BEGIN_ALLOW_THREADS
    status = MQReceiveMessageNoWait(self->consumer, &msg->message);
    Py_END_ALLOW_THREADS
    Py_INCREF((PyObject *)msg);
    Py_INCREF((PyObject *)self);

    if (MQ_NO_MESSAGE == MQGetStatusCode(status)) {
        Py_CLEAR(msg);
        Py_RETURN_NONE;
    }

    if (setExceptionFromStatus(status)) {
        Py_CLEAR(msg);
        return NULL;
    }

    return (PyObject *)msg;
}


static PyMethodDef mqcrt_MQConsumer_methods[] = {
    {"receive_message_wait", (PyCFunction)mqcrt_MQConsumer_receive_message_wait, METH_VARARGS | METH_KEYWORDS, "Receive a message. Waits until a message is available."},
    {"receive_message_with_timeout", (PyCFunction)mqcrt_MQConsumer_receive_message_with_timeout, METH_VARARGS | METH_KEYWORDS, "Receive a message. Waits until a message is available or a timeout has expired."},
    {"receive_message_nowait", (PyCFunction)mqcrt_MQConsumer_receive_message_nowait, METH_NOARGS, "Receive a message. Returns None of no message is available."},
    {"close", (PyCFunction)mqcrt_MQConsumer_close, METH_NOARGS, "Close the message consumer."},
    {NULL}
};

static PyTypeObject mqcrt_MQConsumerType = {
    PyObject_HEAD_INIT(NULL)
    0,                                       /* ob_size */
    "imq.MQConsumer",                        /* tp_name */
    sizeof(mqcrt_MQConsumer),                /* tp_basicsize */
    0,                                       /* tp_itemsize */
    (destructor)mqcrt_MQConsumer_dealloc,    /* tp_dealloc */
    0,                                       /* tp_print */
    0,                                       /* tp_getattr */
    0,                                       /* tp_setattr */
    0,                                       /* tp_compare */
    0,                                       /* tp_repr */
    0,                                       /* tp_as_number */
    0,                                       /* tp_as_sequence */
    0,                                       /* tp_as_mapping */
    0,                                       /* tp_hash */
    0,                                       /* tp_call */
    0,                                       /* tp_str */
    0,                                       /* tp_getattro */
    0,                                       /* tp_setattro */
    0,                                       /* tp_as_buffer */
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, /* tp_flags */
    "MQConsumer objects",                    /* tp_doc */
    (traverseproc)mqcrt_MQConsumer_traverse, /* tp_traverse */
    (inquiry)mqcrt_MQConsumer_clear,         /* tp_clear */
    0,                                       /* tp_richcompare */
    0,                                       /* tp_weaklistoffset */
    0,                                       /* tp_iter */
    0,                                       /* tp_iternext */
    mqcrt_MQConsumer_methods,                /* tp_methods */
    0,                                       /* tp_members */
    0,                                       /* tp_getset */
    0,                                       /* tp_base */
    0,                                       /* tp_dict */
    0,                                       /* tp_descr_get */
    0,                                       /* tp_descr_set */
    0,                                       /* tp_dictoffset */
    (initproc)mqcrt_MQConsumer_init,         /* tp_init */
    0,                                       /* tp_alloc */
    mqcrt_MQConsumer_new,                    /* tp_new */
    0,
};

#endif
